Skip to content

[improve][fn] PIP-484: Expose incremental window events via IncrementalWindowFunction#25967

Open
Dream95 wants to merge 1 commit into
apache:masterfrom
Dream95:pip_incremental-window-function
Open

[improve][fn] PIP-484: Expose incremental window events via IncrementalWindowFunction#25967
Dream95 wants to merge 1 commit into
apache:masterfrom
Dream95:pip_incremental-window-function

Conversation

@Dream95

@Dream95 Dream95 commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Motivation

Pulsar Window Functions currently invoke WindowFunction.process(Collection<Record<X>>, ...) with all messages in the window on every trigger. Internally, WindowManager already classifies events into full, newly added, and expired lists on each activation, but WindowFunctionExecutor drops getNew() and getExpired() before calling the user function.
This makes incremental computation inefficient for sliding windows and forces users to manually diff full collections.

Modifications

This PR adds PIP-484, which proposes:

  • Promote the existing internal Window<T> interface to the public API (get(), getNew(), getExpired(), timestamps).
  • Add a new IncrementalWindowFunction<X, T> interface that receives Window<Record<X>> on each trigger.
  • Auto-detect the new interface in WindowFunctionExecutor with no new configuration.
  • Update Functions deployment validation (FunctionConfigUtils, FunctionCommon) to accept the new interface.
    Existing WindowFunction implementations remain fully backward compatible.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…talWindowFunction

Signed-off-by: Dream95 <zhou_8621@163.com>
@Dream95 Dream95 changed the title [improve][pip] PIP-484: Expose incremental window events via IncrementalWindowFunction [improve][fn] PIP-484: Expose incremental window events via IncrementalWindowFunction Jun 8, 2026
@dao-jun

dao-jun commented Jun 10, 2026

Copy link
Copy Markdown
Member

overall LGTM, just 2 points:

  1. The FunctionCommon.getFunctionClassParent() change is more critical than the PIP describes — it's not about "keeping type inference correct," it's that without it, deployment will NPE (assuming doJavaChecks is fixed first). The call chain is getFunctionTypes → getFunctionClassParent → resolveInterfaceTypeArguments.
    When the parent interface is misidentified as java.util.function.Function, resolveInterfaceTypeArguments traverses the interface hierarchy and never finds Function (since IncrementalWindowFunction doesn't extend it), returns null, and typeArgsList.get(0) throws NPE. This should be documented as a hard dependency, not a
    correctness improvement.
  2. Window.get() / getNew() / getExpired() return mutable List. While each trigger creates fresh lists so mutations can't corrupt internal state, once this becomes a public API the Javadoc should explicitly state whether callers are allowed to modify the returned lists.

@david-streamlio david-streamlio left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of PIP-484. Overall this is a clear, well-scoped proposal — good background, an honest backward-compat analysis, a concrete example, and a diagram; it passes the "can a reader understand it without hours of code reading" health check. The inline comments below are (1) a few required template sections that are missing and (2) some public-API design points to nail down. I've left out the two points already raised on the PR (the getFunctionClassParent NPE dependency and list mutability).

Comment thread pip/pip-484.md

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

There is no wire-protocol change between Functions Workers. No special geo-replication considerations apply.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The template requires three sections that are currently missing — could you add them, even if brief?

  • Security Considerations — this is a pure API addition with no new endpoints, so a sentence confirming "no new REST/protocol surface, no new auth or multi-tenancy implications" is enough.
  • Monitoring / Metrics — please state explicitly "no new metrics; runtime behavior is unchanged."
  • Alternatives — the most important one. Why a new interface rather than (a) default methods on WindowFunction, (b) an overloaded process(Window, ...), or (c) a config flag? Documenting why these were rejected will pre-empt the obvious review questions. It's also the right place to defend the name IncrementalWindowFunction, since it exposes expired events too, not just increments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option A seems better than the current approach of adding new interfaces. Let me think about whether there are any compatibility issues.

Comment thread pip/pip-484.md
}
```

The existing internal `Window.java` is replaced by a reference to the `api-java` interface (or removed entirely, with `WindowImpl` implementing the new public interface directly).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promoting an internal type to public API is exactly the surface the PIP process exists to scrutinize, so this shouldn't be left as an either/or ("replaced by a reference … or removed entirely"). Please commit to one approach and spell out what happens to any existing references to the old org.apache.pulsar.functions.windowing.Window (even though it's an internal package today).

Comment thread pip/pip-484.md
* @param inputWindow the window view for this activation, providing access to
* all current events ({@link Window#get()}),
* newly added events ({@link Window#getNew()}), and
* expired events ({@link Window#getExpired()}).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the list-mutability question already raised on this PR, please document the lifetime of the Window reference: is it valid only during the process() call, or may a user retain it across triggers? Lifetime/ownership contracts matter once this interface is public.

Comment thread pip/pip-484.md
#### 3a. Add field

```java
protected IncrementalWindowFunction<T, X> incrementalWindowFunction;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public interface is declared IncrementalWindowFunction<X, T> (X=input, T=output), but this executor field is <T, X>. This matches the internal WindowFunction<T,X> convention, so it's defensible — but the doc shows both orderings without comment, which will trip readers. A one-line note clarifying the convention would help.

Comment thread pip/pip-484.md
if (userClassObject instanceof java.util.function.Function) {
// existing logic, unchanged
bareWindowFunction = ...;
} else if (userClassObject instanceof IncrementalWindowFunction) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatch order here is FunctionIncrementalWindowFunctionWindowFunction. A user class implementing both IncrementalWindowFunction and WindowFunction (or both Function and IncrementalWindowFunction) resolves by this precedence. Since that becomes an observable public-API contract, please state the ordering explicitly and confirm it's intentional.

Comment thread pip/pip-484.md
|------|--------|
| `FunctionConfigUtils.doJavaChecks()` | Add `IncrementalWindowFunction` to the allowed user-class interfaces. |
| `FunctionCommon.getFunctionClassParent()` | When `windowConfig` is set, resolve `IncrementalWindowFunction` before `WindowFunction` so input/output type inference for SerDe and schema checks stays correct. |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required by the template, but reviewers usually ask: a sentence on intended test coverage (executor dispatch for each interface type, and deployment-validation acceptance of the new interface) would strengthen the proposal.

Comment thread pip/pip-484.md
| `List<T> getNew()` | Events added since the last trigger |
| `List<T> getExpired()` | Events removed since the last trigger |
| `Long getStartTimestamp()` | Window start time (non-null for time-based windows, otherwise `null`) |
| `Long getEndTimestamp()` | Window end time (watermark in event-time mode, system time in processing-time mode) |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStartTimestamp() documents its null behavior, but getEndTimestamp()'s description implies it is never null. Please confirm and capture this in the Javadoc, since both methods are now public.

@david-streamlio

Copy link
Copy Markdown
Contributor

A few PIP-process items (separate from the proposal content):

  • PR title is [improve][fn], but the PIP README specifies [improve][pip] PIP-xxx: {title} for PIP PRs. Worth aligning (the semantic-PR CI may flag it).
  • No PIP label is applied yet — it's needed for the PIP index/tracking query.
  • Please confirm PIP-484 is still the next free number — numbers are assigned by open-PR order and can race with other in-flight PIPs.
  • The Links section still has TBD for the [DISCUSS]/[VOTE] mailing-list threads — expected at this stage, just a reminder to fill them in before the vote concludes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants